Conversation
S0okJu
left a comment
There was a problem hiding this comment.
security group rule 관련 도구 제작해주셔서 감사합니다.
| extracted: list[str] = [] | ||
| for r in rules: | ||
| rid = ( | ||
| r.get("id") |
There was a problem hiding this comment.
r 객체에 id가 없으면 속성으로 추출하는 예외처리를 하신 이유가 있으신가요?
There was a problem hiding this comment.
해당부분은 아래 @halucinor 님 피드백 반영하면서 정리되었습니다~
| "direction": direction, | ||
| "ethertype": ethertype, | ||
| } | ||
| args["protocol"] = protocol |
There was a problem hiding this comment.
위의 필수 인자처럼 args 딕셔너리 내에서 처리하지 않은 이유가 있나요?
There was a problem hiding this comment.
이부분은 통일해서 dict로 정리하겠습니다!
| mcp.tool()(self.create_security_group_rule) | ||
| mcp.tool()(self.get_security_group_rule_detail) | ||
| mcp.tool()(self.delete_security_group_rule) | ||
| mcp.tool()(self.create_security_group_rules_bulk) |
There was a problem hiding this comment.
create_security_group_rules_bulk 도구를 제작하신 이유가 있으신가요?
There was a problem hiding this comment.
리소스를 다루는 API를 만들때 벌크요청이 있으면 인터페이스를 같이 올려주는게 좋을것 같아서 추가했습니다.
보안그룹 규칙의 예를 들어.. 프롬프팅을 아래와 같이 한다면 한번에 여러 규칙을 추가할 수 있으니까요
test-security 보안그룹에 HTTP, HTTPS, SSH의 인바운드를 허용하고 ICMP를 양방향 허용하는 규칙을 추가해줘
무엇보다 SDK에 구현이 되어있어서 가져왔습니다 😋
There was a problem hiding this comment.
상기 답변으로 충분하다고 생각되는데 혹시 다른 이유가 더 필요할까요?
벌크요청을 하면 단건 호출 대비 효율성이 좋고 동일한 파라미터로 요청할 경우 휴먼에러를 방지할 수 있습니다. 오픈스택에는 보안그룹 규칙 외에도 아래와 같이 자체적으로 여러개의 리소스를 다룰 수 있는 API가 구현되어 있습니다.. 일단 제가 당장 생각나는건 이거 두가지네요.
| updated = conn.network.update_port(port_id, **update_args) | ||
| return self._convert_to_port_model(updated) | ||
|
|
||
| def add_security_group_to_port( |
There was a problem hiding this comment.
security group에 port를 추가하는 도구인 것 같아요.
함수명을 add_port_in_security_group(예시) 이런식으로 변경하는 것은 어떨까요?
There was a problem hiding this comment.
그러네요 원래 다른 이름이였는데 서순 바꾸다가 미처 못보고 넘어간 이름이네요
변경하도록 하겠습니다
| args: dict = { | ||
| "security_group_id": security_group_id, | ||
| "direction": direction, | ||
| "ethertype": ethertype, | ||
| } | ||
| args["protocol"] = protocol | ||
| args["port_range_min"] = port_range_min | ||
| args["port_range_max"] = port_range_max | ||
| args["remote_ip_prefix"] = remote_ip_prefix | ||
| args["remote_group_id"] = remote_group_id | ||
| args["description"] = description | ||
| args["project_id"] = project_id |
There was a problem hiding this comment.
❓ 인자가 None일때 의도하지 않게 설정이 없어질 수 있는 케이스는 없을까요?
There was a problem hiding this comment.
이부분은 주어진 파라미터를 다시 정제해서 사용하도록 개선해보겠습니다~
| def _convert_to_security_group_rule_model( | ||
| self, openstack_rule | ||
| ) -> SecurityGroupRule: | ||
| """ | ||
| Convert an OpenStack Security Group Rule object to a pydantic model. | ||
|
|
||
| :param openstack_rule: OpenStack rule object | ||
| :return: SecurityGroupRule model | ||
| """ | ||
| return SecurityGroupRule( | ||
| id=getattr(openstack_rule, "id"), | ||
| name=getattr(openstack_rule, "name", None), | ||
| status=getattr(openstack_rule, "status", None), | ||
| description=getattr(openstack_rule, "description", None), | ||
| project_id=getattr(openstack_rule, "project_id", None), | ||
| direction=getattr(openstack_rule, "direction", None), | ||
| ethertype=getattr(openstack_rule, "ethertype", None), | ||
| protocol=getattr(openstack_rule, "protocol", None), | ||
| port_range_min=getattr(openstack_rule, "port_range_min", None), | ||
| port_range_max=getattr(openstack_rule, "port_range_max", None), | ||
| remote_ip_prefix=getattr(openstack_rule, "remote_ip_prefix", None), | ||
| remote_group_id=getattr(openstack_rule, "remote_group_id", None), | ||
| security_group_id=getattr( | ||
| openstack_rule, "security_group_id", None | ||
| ), | ||
| ) |
There was a problem hiding this comment.
SecurityGroupRule(**openstack_rule) 로 간소화 시킬 수 있을 것 같은데 확인부탁드립니다. 로직으로만 봤을 때_convert_to_security_group_rule_model 함수가 필요 없을 것 같아요
There was a problem hiding this comment.
테스트해보니 잘 되네요. 이부분은 정리하도록 하겠습니다.
| """ | ||
| conn = get_openstack_conn() | ||
| current = conn.network.get_port(port_id) | ||
| current_group_ids: list[str] = list(current.security_group_ids or []) |
There was a problem hiding this comment.
@platanus-kr
민철님! security rule 관련 코드를 잘 작성해주셔서 감사합니다~!
get_port 함수 결과가 없을 경우 혹시나 이미 빈 배열을 반환하진 않을까 하는 생각에 sdk코드를 찾아보았는데요,
port_id를 찾을 수 없다면 NotFoundException 예외를 발생시키더라구요
그래서 제 생각엔 sdk에서 넘어온 Exception때문에 프로그램이 or [] 실행되기 전에 이미 중단되므로, try catch로 호출부에서 예외를 잡고 처리하거나.. 아니면 or []를 지우고 404를만나면 그대로 실행이 중단되는 방향으로 가면 좋을 것 같은데 어떻게 생각하시나요?
(상세)-------
- _proxy.py파일에서 get_port함수가 _get함수 호출
- proxy.py파일에서 _get함수가 fetch함수 호출
- resource.py파일에서 fetch함수가 _translate_response함수 호출
- resource.py파일에서 _translate_response함수가 exceptions.raise_from_response 호출
- 404면 raise NotFoundException
There was a problem hiding this comment.
sdk 에서 발생하는 이슈를 위로 던지도록 하겠습니다!
| """ | ||
| conn = get_openstack_conn() | ||
| current = conn.network.get_port(port_id) | ||
| current_group_ids: list[str] = list(current.security_group_ids or []) |
There was a problem hiding this comment.
or [] 에 대해 위 코멘트와 똑같이 생각합니다!
c5ce45d to
a666530
Compare
| mcp.tool()(self.get_port_allowed_address_pairs) | ||
| mcp.tool()(self.set_port_binding) | ||
| mcp.tool()(self.add_port_to_security_group) | ||
| mcp.tool()(self.remove_security_group_from_port) |
There was a problem hiding this comment.
add_port_to_security_group 부분과 동일하게 remove_port_from_security_group (예시) 로 변경하면 좋을 것 같습니다.
S0okJu
left a comment
There was a problem hiding this comment.
create_security_group_rules_bulk 도구를 제작하신 이유에 대해 설명 부탁드립니다!
| security_group_id=security_group_id, | ||
| direction=direction, | ||
| ethertype=ethertype, | ||
| **create_args, |
There was a problem hiding this comment.
인자 전달 방식이 일관적이였으면 좋겠습니다. security_group_id, direction, ethertype 이 필수 필드여서 positional 한 인자로 전달한걸로 보이는데, 기능상 차이가 없으니 전체를 키워드 인자로 전달하는게 더 깔끔해보입니다
| """ | ||
| Create multiple Security Group Rules in bulk. | ||
|
|
||
| Each rule dict should follow Neutron SG rule schema keys (e.g., |
There was a problem hiding this comment.
맥락정보로 전달되기 때문에 SG 같은 약자가 의미 표현에 좋지 않을 것 같습니다
| def create_security_group_rules_bulk( | ||
| self, | ||
| rules: list[dict], | ||
| ) -> list[SecurityGroupRule]: | ||
| """ | ||
| Create multiple Security Group Rules in bulk. | ||
|
|
||
| Each rule dict should follow Neutron SG rule schema keys (e.g., | ||
| security_group_id, direction, ethertype, protocol, port_range_min, | ||
| port_range_max, remote_ip_prefix, remote_group_id, description, project_id). | ||
|
|
||
| :param rules: List of rule dictionaries | ||
| :return: List of created SecurityGroupRule models | ||
| """ | ||
| conn = get_openstack_conn() | ||
|
|
||
| def _clean_rule_payload(raw: dict) -> dict: | ||
| payload = dict(raw) | ||
| # Remove port ranges when protocol is absent (Neutron requirement) | ||
| if not payload.get("protocol"): | ||
| payload.pop("port_range_min", None) | ||
| payload.pop("port_range_max", None) | ||
| else: | ||
| # Coerce possible string ports to integers | ||
| for key in ("port_range_min", "port_range_max"): | ||
| if key in payload: | ||
| coerced = self._coerce_port(payload.get(key)) | ||
| if coerced is None: | ||
| payload.pop(key, None) | ||
| else: | ||
| payload[key] = coerced | ||
| # Drop keys explicitly set to None | ||
| return {k: v for k, v in payload.items() if v is not None} |
There was a problem hiding this comment.
pydantic 모델로 인자정보를 명시해도 LLM이 인자 타입을 인지하지 못하는지 확인이 필요해 보입니다
| ) | ||
| assert res_toggle.is_admin_state_up is True | ||
|
|
||
| def test_add_remove_security_group_on_port( |
There was a problem hiding this comment.
add, remove 두가지 테스트를 따로 만들어야 합니다. 하나의 테스트에서 검증하는 내용이 너무 많아 테스트의 목적을 알 수 없습니다,
| def _coerce_port(self, value) -> int | None: | ||
| """ | ||
| Coerce a port value that may arrive as a string (e.g., "80") into int. |
There was a problem hiding this comment.
질문 : Coerce라는 단어를 이번에 처음 봤는대 일반적으로 많이 사용되는 단어인가요?
Overview
Key Changes
Related Issues
Additional context